---
title: "Security Best Practices"
description: "Secure your mcp_use implementations and protect sensitive data"
icon: "shield"
---

Security is crucial when working with MCP servers and LLM agents. This guide covers best practices for protecting your applications, data, and infrastructure.

<Warning>
**Important**: MCP servers can have powerful capabilities including file system access, network requests, and code execution. Always follow security best practices to protect your systems.
</Warning>

## API Key Management

### Environment Variables

Never hardcode API keys in your source code. Use environment variables:

```python secure_config.py
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Access API keys securely
openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY")

if not openai_key:
    raise ValueError("OPENAI_API_KEY environment variable is required")
```

### .env File Security

Create a secure `.env` file:

```bash .env
# LLM Provider Keys
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GROQ_API_KEY=gsk_...

# MCP Server Configuration
FILESYSTEM_ROOT=/safe/workspace
DATABASE_URL=postgresql://user:pass@localhost/db

# Optional: Security settings
MCP_TIMEOUT=30
MAX_TOOL_CALLS=10
ALLOWED_DOMAINS=example.com,api.service.com
```

<Warning>
**Never commit .env files**: Add `.env` to your `.gitignore` file to prevent accidentally committing API keys to version control.
</Warning>

### Secrets Management

For production environments, use proper secrets management:

<Tabs>
  <Tab title="AWS Secrets Manager">
    ```python
    import boto3
    from botocore.exceptions import ClientError

    def get_secret(secret_name, region_name="us-east-1"):
        session = boto3.session.Session()
        client = session.client('secretsmanager', region_name=region_name)

        try:
            response = client.get_secret_value(SecretId=secret_name)
            return response['SecretString']
        except ClientError as e:
            raise e

    # Usage
    openai_key = get_secret("prod/mcp-use/openai-key")
    ```
  </Tab>
  <Tab title="Azure Key Vault">
    ```python
    from azure.keyvault.secrets import SecretClient
    from azure.identity import DefaultAzureCredential

    credential = DefaultAzureCredential()
    client = SecretClient(
        vault_url="https://vault-name.vault.azure.net/",
        credential=credential
    )

    # Usage
    openai_key = client.get_secret("openai-api-key").value
    ```
  </Tab>
  <Tab title="HashiCorp Vault">
    ```python
    import hvac

    client = hvac.Client(url='https://vault.example.com')
    client.token = os.getenv('VAULT_TOKEN')

    # Read secret
    response = client.secrets.kv.v2.read_secret_version(
        path='mcp-use/api-keys'
    )
    openai_key = response['data']['data']['openai_key']
    ```
  </Tab>
</Tabs>

## MCP Server Security

### Filesystem Server Security

When using filesystem servers, restrict access to safe directories:

```json secure_filesystem_config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "mcp-server-filesystem",
      "args": [
        "/workspace/safe-directory",
        "--readonly",
        "--max-file-size", "10MB",
        "--allowed-extensions", ".txt,.md,.json,.py"
      ],
      "env": {
        "FILESYSTEM_READONLY": "true",
        "MAX_FILE_SIZE": "10485760"
      }
    }
  }
}
```

### Network Access Restrictions

Limit network access for web-based MCP servers:

```json secure_network_config.json
{
  "mcpServers": {
    "playwright": {
      "command": "npx",
      "args": ["@playwright/mcp@latest"],
      "env": {
        "PLAYWRIGHT_HEADLESS": "true",
        "ALLOWED_DOMAINS": "example.com,api.trusted-service.com",
        "BLOCK_PRIVATE_IPS": "true",
        "DISABLE_JAVASCRIPT": "false",
        "TIMEOUT": "30000"
      }
    }
  }
}
```

### Database Security

Secure database connections with proper credentials and restrictions:

```json secure_database_config.json
{
  "mcpServers": {
    "postgres": {
      "command": "mcp-server-postgres",
      "env": {
        "DATABASE_URL": "${DATABASE_URL}",
        "CONNECTION_TIMEOUT": "30",
        "MAX_CONNECTIONS": "5",
        "READONLY_MODE": "true",
        "ALLOWED_SCHEMAS": "public,reporting",
        "BLOCKED_TABLES": "users,passwords,secrets"
      }
    }
  }
}
```

## Agent Security Configuration

### Restrict Tool Access

Limit which tools the agent can use:

```python secure_agent.py
from mcp_use import MCPAgent, MCPClient
from langchain_openai import ChatOpenAI

# Define allowed and disallowed tools
ALLOWED_TOOLS = [
    "file_read",
    "file_write",
    "web_search",
    "web_scrape"
]

DISALLOWED_TOOLS = [
    "system_execute",
    "network_request",
    "database_write",
    "file_delete"
]

async def create_secure_agent():
    client = MCPClient.from_config_file("secure_config.json")
    llm = ChatOpenAI(model="gpt-4")

    agent = MCPAgent(
        llm=llm,
        client=client,
        allowed_tools=ALLOWED_TOOLS,
        disallowed_tools=DISALLOWED_TOOLS,
        max_steps=20,  # Limit execution steps
        timeout=300,   # 5-minute timeout
        use_server_manager=True
    )

    return agent
```

### Input Validation

Validate user inputs before processing:

```python input_validation.py
import re
from typing import List, Optional

class InputValidator:
    def __init__(self):
        self.max_length = 1000
        self.blocked_patterns = [
            r'rm\s+-rf',          # Dangerous commands
            r'sudo',              # Privilege escalation
            r'chmod\s+777',       # Permission changes
            r'\.\./',             # Path traversal
            r'<script',           # XSS attempts
            r'DROP\s+TABLE',      # SQL injection
        ]

    def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
        """Validate user query for security issues"""

        # Check length
        if len(query) > self.max_length:
            return False, f"Query too long (max {self.max_length} characters)"

        # Check for blocked patterns
        for pattern in self.blocked_patterns:
            if re.search(pattern, query, re.IGNORECASE):
                return False, f"Query contains blocked pattern: {pattern}"

        # Check for suspicious characters
        suspicious_chars = ['&', '|', ';', '`', '$']
        if any(char in query for char in suspicious_chars):
            return False, "Query contains potentially dangerous characters"

        return True, None

# Usage
validator = InputValidator()

async def secure_query_handler(user_query: str):
    is_valid, error = validator.validate_query(user_query)

    if not is_valid:
        raise ValueError(f"Invalid query: {error}")

    agent = await create_secure_agent()
    return await agent.run(user_query)
```

### Rate Limiting

Implement rate limiting to prevent abuse:

```python rate_limiting.py
import time
from collections import defaultdict
from typing import Dict, Tuple

class RateLimiter:
    def __init__(self, max_requests: int = 10, window_seconds: int = 60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: Dict[str, List[float]] = defaultdict(list)

    def is_allowed(self, user_id: str) -> Tuple[bool, str]:
        """Check if user is within rate limits"""
        now = time.time()
        window_start = now - self.window_seconds

        # Clean old requests
        self.requests[user_id] = [
            req_time for req_time in self.requests[user_id]
            if req_time > window_start
        ]

        # Check if under limit
        if len(self.requests[user_id]) >= self.max_requests:
            return False, f"Rate limit exceeded: {self.max_requests} requests per {self.window_seconds} seconds"

        # Record this request
        self.requests[user_id].append(now)
        return True, ""

# Usage
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)

async def rate_limited_query(user_id: str, query: str):
    allowed, message = rate_limiter.is_allowed(user_id)

    if not allowed:
        raise ValueError(message)

    agent = await create_secure_agent()
    return await agent.run(query)
```

## Logging and Monitoring

### Security Logging

Implement comprehensive security logging:

```python security_logging.py
import logging
import json
from datetime import datetime
from typing import Any, Dict

class SecurityLogger:
    def __init__(self, log_file: str = "security.log"):
        self.logger = logging.getLogger("mcp_security")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_agent_start(self, user_id: str, query: str):
        """Log when an agent starts processing"""
        self.logger.info(f"Agent started - User: {user_id}, Query: {query[:100]}...")

    def log_tool_usage(self, user_id: str, tool_name: str, success: bool):
        """Log tool usage attempts"""
        status = "SUCCESS" if success else "FAILED"
        self.logger.info(f"Tool used - User: {user_id}, Tool: {tool_name}, Status: {status}")

    def log_security_violation(self, user_id: str, violation_type: str, details: str):
        """Log security violations"""
        self.logger.warning(f"SECURITY VIOLATION - User: {user_id}, Type: {violation_type}, Details: {details}")

    def log_error(self, user_id: str, error: str):
        """Log errors"""
        self.logger.error(f"Error - User: {user_id}, Error: {error}")

# Usage
security_logger = SecurityLogger()

async def monitored_agent_run(user_id: str, query: str):
    security_logger.log_agent_start(user_id, query)

    try:
        agent = await create_secure_agent()
        result = await agent.run(query)
        security_logger.log_tool_usage(user_id, "agent_complete", True)
        return result
    except Exception as e:
        security_logger.log_error(user_id, str(e))
        raise
```

### Monitoring Dashboard

Create monitoring for security events:

```python monitoring.py
from prometheus_client import Counter, Histogram, start_http_server
import time

# Metrics
REQUEST_COUNT = Counter('mcp_requests_total', 'Total requests', ['user_id', 'status'])
REQUEST_DURATION = Histogram('mcp_request_duration_seconds', 'Request duration')
SECURITY_VIOLATIONS = Counter('mcp_security_violations_total', 'Security violations', ['type'])

async def monitored_agent_execution(user_id: str, query: str):
    start_time = time.time()

    try:
        # Your existing security checks
        is_valid, error = validator.validate_query(query)
        if not is_valid:
            SECURITY_VIOLATIONS.labels(type='invalid_query').inc()
            raise ValueError(error)

        allowed, message = rate_limiter.is_allowed(user_id)
        if not allowed:
            SECURITY_VIOLATIONS.labels(type='rate_limit').inc()
            raise ValueError(message)

        # Execute agent
        agent = await create_secure_agent()
        result = await agent.run(query)

        REQUEST_COUNT.labels(user_id=user_id, status='success').inc()
        return result

    except Exception as e:
        REQUEST_COUNT.labels(user_id=user_id, status='error').inc()
        raise
    finally:
        REQUEST_DURATION.observe(time.time() - start_time)

# Start metrics server
start_http_server(8000)
```

## Production Deployment Security

### Container Security

Use secure container configurations:

```dockerfile Dockerfile
FROM python:3.9-slim

# Create non-root user
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser

# Install security updates
RUN apt-get update && apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
    ca-certificates && \
    rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Set ownership and permissions
RUN chown -R mcpuser:mcpuser /app
USER mcpuser

# Expose port
EXPOSE 8000

# Run application
CMD ["python", "main.py"]
```

### Network Security

Configure network policies and firewalls:

```yaml kubernetes_network_policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: mcp-use-policy
spec:
  podSelector:
    matchLabels:
      app: mcp-use
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: frontend
    ports:
    - protocol: TCP
      port: 8000
  egress:
  - to: []
    ports:
    - protocol: TCP
      port: 443  # HTTPS only
  - to:
    - namespaceSelector:
        matchLabels:
          name: database
    ports:
    - protocol: TCP
      port: 5432
```

## Security Checklist

<AccordionGroup>
  <Accordion title="API Key Security">
    - [ ] API keys stored in environment variables or secrets manager
    - [ ] No hardcoded credentials in source code
    - [ ] .env files added to .gitignore
    - [ ] Regular API key rotation implemented
    - [ ] Least privilege access for API keys
  </Accordion>

  <Accordion title="MCP Server Security">
    - [ ] Filesystem access restricted to safe directories
    - [ ] Network access limited to necessary domains
    - [ ] Database connections use read-only accounts where possible
    - [ ] Input validation on all server parameters
    - [ ] Resource limits configured (timeouts, file sizes, etc.)
  </Accordion>

  <Accordion title="Agent Configuration">
    - [ ] Tool access restricted using allowed/disallowed lists
    - [ ] Maximum execution steps limited
    - [ ] Timeouts configured for agent operations
    - [ ] Input validation implemented
    - [ ] Rate limiting in place
  </Accordion>

  <Accordion title="Monitoring & Logging">
    - [ ] Security events logged
    - [ ] Monitoring dashboard configured
    - [ ] Alerting set up for security violations
    - [ ] Log retention policies in place
    - [ ] Regular security audits scheduled
  </Accordion>
</AccordionGroup>

## Common Security Vulnerabilities

### Path Traversal Prevention

```python
import os
from pathlib import Path

def secure_file_path(base_dir: str, user_path: str) -> str:
    """Safely resolve user-provided file paths"""
    base = Path(base_dir).resolve()
    target = (base / user_path).resolve()

    # Ensure the target is within the base directory
    if not str(target).startswith(str(base)):
        raise ValueError("Path traversal attempt detected")

    return str(target)

# Usage in MCP server configuration
safe_workspace = secure_file_path("/workspace", user_provided_path)
```

### Command Injection Prevention

```python
import shlex

def secure_command_args(command: str, args: List[str]) -> List[str]:
    """Safely construct command arguments"""
    # Whitelist allowed commands
    allowed_commands = ["node", "python", "npm", "pip"]

    if command not in allowed_commands:
        raise ValueError(f"Command '{command}' not allowed")

    # Escape arguments
    safe_args = [shlex.quote(arg) for arg in args]

    return [command] + safe_args
```

## Next Steps

<CardGroup cols={3}>
  <Card title="Configuration Guide" icon="gear" href="/getting-started/configuration">
    Learn secure configuration practices for MCP servers
  </Card>
  <Card title="Deployment Guide" icon="rocket" href="/development">
    Best practices for secure production deployment
  </Card>
  <Card title="Troubleshooting" icon="warning" href="/troubleshooting/common-issues">
    Debug security-related issues and errors
  </Card>
</CardGroup>

<Tip>
Security is an ongoing process. Regularly review and update your security practices, monitor for new vulnerabilities, and keep all dependencies up to date.
</Tip>
